You can customize connectors to allow the use of the Interconnector service between them.
Required dependencies
You must first add the following JAR files located in <DATADIR>/javabin/plugin to your project:
• interconnector-service-java-framework.jar
• datainteg-java-commons-queue.jar
Master connector sample code
To allow connection between the connectors and the Interconnecter server, you must first check that an Interconnector server has been deployed in the Administration Console (Deployment > Roles). For more details, see "Configure the Interconnector Server" in Exalead CloudView Connectors Guide.
You must also add two configuration keys to your connector:
• the Interconnector server instance name
• the slave connector name
Below is a sample code for your master connector (JDBC here).
//Master connector //While processing a column containing a path, adds a File System Query (FS Query = a document path) to the message bus
//Instantiation of the Interconnector Service InterConnectorServiceBuilderImpl builder = (InterConnectorServiceBuilderImpl) InterConnectorService.builder(); builder.withDestination(config.slaveConnector); //a configuration key has been added to the connector, to know the //name of the slave connector builder.withQuerySerializer(new FileSystemQuerySerializer()); //the file system query serializer (to xml) supplied by //the JDBC connector builder.withInterconnectorServerInstance(config.interconnectorServerInstanceName); //a configuration key has been //added to the connector, to know the interconnector server instance name the query will be sent to InterConnectorServiceImpl service = builder.build(); //this is time consuming, the service should be instantiated //only once per application (as a Singleton) //End of the instantiation
//Creation of the File System Query FileSystemQuery fileSystemQuery = new FileSystemQuery(); fileSystemQuery.setPath(filePath); //Calls to the service to delete and add a query service.deleteQuery(docURI.toString()); //clear the query before adding the new one service.addQuery(fileSystemQuery, false, true, docURI.toString()); //docURI is the URI of the JDBC document that is currently processed
//Creation of the parent document in the Consolation Box, with type "aggregated" PushAPITransformationHelpers.addArcTo(document, "parent", docURI.toString() + "_REL"); PushAPITransformationHelpers.setType(document, "aggregated");
//Don't forget to close the service when all the processing is done service.closeService();
Slave connector sample code
Below is a sample code for your slave connector (File System here).
//Slave connector
//Enumerates the watched queries InterConnectorServiceBuilderImpl builder = (InterConnectorServiceBuilderImpl) InterConnectorService.builder(); builder.withReceiverName(key.connector.getConnectorName()); //the receiver is the connector itself builder.withQuerySerializer(new FileSystemQuerySerializer()); builder.withInterconnectorServerInstance(config.interconnectorServerInstanceName); InterConnectorServiceImpl service = builder.build(); //this is time consuming, the service should be instantiated only //once per application (as a Singleton) service.pollMessageQueue(); Iterable<ImmutablePair<String, UserPayloadWithUri<String, String>>> tripletIterable = service.getQueries(); Iterator<ImmutablePair<String, UserPayloadWithUri<String, String>>> iteratorQueries = tripletIterable.iterator();
if (iteratorQueries != null && iteratorQueries.hasNext()){ try { final ImmutablePair<String, UserPayloadWithUri<String, String>> triplet = iteratorQueries.next(); UserPayloadWithUri<String, String> queryAndFlags = triplet.getRight(); Query query = service.getSerializer().deserialize(queryAndFlags.getValue()); String checkpoint = triplet.getLeft(); String filepath = query.getUID(); ... FilesystemKey skey = new FilesystemKey(key.connector, filepath, connectorconfig.createFileFromRootPath (filesystemRootPathConfig), false, true); try { service.notifyEndOfQueryJob(checkpoint); } catch (Exception e){ logger.warn("Error while notifying end of query job to storage"); } return (FSKey) skey; } catch (Exception e){ logger.error("Error while adding a root key ",e); return null; } }
//Processes a watched query, i.e. a file system path in this connector //Adding a "parent_uri" meta to link the indexed file system document to the indexed JDBC document try { ArrayList<String> listParentURIs = service.getParentURIFromUID(file.getAbsolutePath()); if (listParentURIs != null && !listParentURIs.isEmpty()){ for (String parentURI : listParentURIs){ collect.addMeta("parent_uri", parentURI); } } service.closeService(); } catch (Exception e ){ logger.debug("Error retrieving parent URI while building PAPI document "+ absolutePath, e); }
//Processes the "parent_uri" metas to create arcs and documents in the consolidation box Collection<Meta> parents_meta = doc.getMetaContainer().getMetaValues("parent_uri"); if (parents_meta != null && !parents_meta.isEmpty()){ Iterator<Meta> iterator = parents_meta.iterator(); while (iterator.hasNext()){ String uri = iterator.next().getValue(); // creating the "relation" intermediate document in the consolidation box, then link it to the child document PushAPITransformationHelpers.createUnmanagedDocument(doc, uri + "_REL", "relation"); PushAPITransformationHelpers.addArcFrom(doc, "rel", uri + "_REL"); } PushAPITransformationHelpers.setType(doc, "child"); }
//Enumerates and processes the deleted queries Iterable<ImmutablePair<String, UserPayload<String, String>>> deleteIterable = service.getDeletedQueries(); Iterator<ImmutablePair<String, UserPayload<String, String>>> iteratorDeletedQueries = tripletIterable.iterator();
You must now configure the Interconnector aggregation processor in the Administration Console with the appropriate document types and arcs defined in your code. For more details, see "Add the Interconnector aggregation processor" in Exalead CloudView Connectors Guide.
You can scan your master connector, then your slave connector.